SQL 连接器

您所在的位置:网站首页 mongodb connector bi SQL 连接器

SQL 连接器

#SQL 连接器| 来源: 网络整理| 查看: 265

RocketmqFlink-1.10Rocketmq Source简述示例With 参数Rocketmq Sink简述示例With 参数FLINK-1.12RocketMq sourceRocketMq sink属性支持的metadata列with属性Rocketmq

Easystream 支持使用 Rocketmq 作为输入/输出数据源。

Flink-1.10Rocketmq Source简述

Easystream 支持使用 Rocketmq 作为输入数据源。

示例CREATE TABLE source ( `key` varchar, `value` VARCHAR ) WITH ( 'connector.type' = 'rocketmq', 'nameserver.address' = 'xxx:9876;xxx:9876', 'group' = 'rocketmqTest', 'topic' = 'rocketmqTest', 'brokerserver.heartbeat.interval' = '30000', 'tag' = '*', 'offset.persist.interval' = '5000', 'batch.size' = '256', 'delay.when.message.not.found'='10', 'offset.reset' = 'earlist' -- 'consumer.offset.from.timestamp'='1577253600000' ); With 参数 参数 注释说明 备注 connector.type 数据源类型 必填,'rocketmq',小写 nameserver.address rocketmq 服务地址 必填 group 消费者组 (consumerGroup) 必填 topic 读取的 topic 必填 offset.reset offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp 必填 offset.from.timestamp 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 选填 brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms tag 读取的tag 选填,默认为'*',全部tag offset.persist.interval 不开启 checkpoint 下,offset 提交的时间间隔 选填,默认 5000ms batch.size 每次拉取的最大数据条数 选填,默认256条 delay.when.message.not.found 当没有获取到数据时的等待时间 选填,默认 10ms Rocketmq Sink简述

Easystream 支持输出到 Rocketmq。

示例create table sink( `key` varchar, `value` VARCHAR ) with( 'connector.type' = 'rocketmq', 'nameserver.address' = 'xxx:9876;xxx:9876', 'group' = 'sloth-test-sink-0117-01', 'topic' = 'sloth-test-0117-01', 'brokerserver.heartbeat.interval' = '30000', 'async' = 'true', 'retry.times' = '3', 'timeout' = '3000', 'flash.on.checkpoint' = 'true', 'msg.delay.level' = '0' ); With 参数 参数 注释说明 备注 connector.type 数据源类型 必填,'rocketmq',小写 nameserver.address rocketmq 服务地址 必填 group 写入的组 必填 topic 写入的 topic 必填 brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms async 是否开启异步写入 选填,默认为 false retry.times async=true 时的重试次数 选填,默认3次 flash.on.checkpoint 是否每次打 checkpoint 时才刷新数据到目标端 选填,默认为 false msg.delay.level 数据的发送级别 选填,默认为0 FLINK-1.12RocketMq sourceCREATE TABLE rocketmq_source ( `surname` STRING, `givenName` STRING ) WITH ( 'connector' = 'rocketmq', 'topic' = 'sloth-test-0601', 'group' = 'behavior_consume_group', 'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876' );RocketMq sinkCREATE TABLE rocketmq_sink ( `surname` STRING, `givenName` STRING ) WITH ( 'connector' = 'rocketmq', 'topic' = 'sloth-test-0601-sink', 'group' = 'behavior_produce_group', 'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876', 'key.columns' = 'surname', 'write.keys.to.body' = 'true' );属性支持的metadata列 参数 注释说明 备注 topic topic名 只读列 with属性 参数 注释说明 备注 connector 数据源类型 必填,'rocketmq',小写 name.server.address rocketmq 服务地址 必填,可用于source或sink group 写入的组 必填,可用于source或sink topic 写入的 topic 必填,可用于source或sink key.columns 键列 只能用于sink write.keys.to.body 是否将键写到body 只能用于sink


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3